package com.ai.common.http;

import com.ai.common.http.dto.FluxMessage;
import com.ai.common.http.dto.FluxMessageRemote;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.FluxSink;
@Slf4j
public class SubscriberImpl implements Subscriber<String>, Disposable {

    private final FluxSink<String> emitter;

    private Subscription subscription;

    public SubscriberImpl(FluxSink<String> emitter) {
        this.emitter = emitter;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String data) {
        if ("[DONE]".equals(data)) {
            log.info("Flux返回数据结束了");
            subscription.request(1);
            emitter.next(JSON.toJSONString(new FluxMessage("", true)));
            emitter.complete();
        } else {
            FluxMessageRemote fluxMessageRemote = JSON.parseObject(data, FluxMessageRemote.class);
            if (fluxMessageRemote != null) {
                FluxMessage fluxMessage = new FluxMessage();
                fluxMessage.setContent(fluxMessageRemote.getMsg());
                fluxMessage.setIsEnd(fluxMessageRemote.getIsEnd());
                //返回数据
                emitter.next(JSON.toJSONString(fluxMessage));
            }
            //接收的消息数
            subscription.request(1);
        }
    }

    @Override
    public void onError(Throwable t) {
        log.error("Flux返回数据异常：{}", t.getMessage());
        emitter.error(t);
    }

    @Override
    public void onComplete() {
        log.info("Flux返回数据完成");
        emitter.complete();
    }

    @Override
    public void dispose() {
        log.info("Flux返回数据关闭");
        emitter.complete();
    }

}
